package com.study.flink.java.day04_async;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;

public class DataToActivityBeanFunction extends RichMapFunction<String, ActivityBean> {

    private transient Connection conn = null;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        // 创建MySQL连接
        conn = DriverManager.getConnection("jdbc:mysql://node02:3306/bigdata?characterEncoding=UTF-8", "linys", "123456");
    }

    // line example: u001,A1,2020-02-26 10:11:12,1,北京市
    // line example: u002,A2,2020-02-25 10:11:12,1,厦门市
    @Override
    public ActivityBean map(String line) throws Exception {
        System.out.println(line);
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String province = fields[4];

        // 转换为活动文字信息，查MySQL
        PreparedStatement preparedStatement = conn.prepareStatement("SELECT name FROM t_activities WHERE id = ?");
        preparedStatement.setString(1, aid);
        ResultSet resultSet = preparedStatement.executeQuery();
        String name = null;
        while (resultSet.next()) {
            name = resultSet.getString(1);
        }
        preparedStatement.close();

        return ActivityBean.of(uid, aid, name, time, eventType, province);
    }

    @Override
    public void close() throws Exception {
        super.close();
        conn.close();
    }
}
